Springboot RabbitTemplate遇到replyCode=312,replyText=NO 您所在的位置:网站首页 rabbittemplate convertandsend Springboot RabbitTemplate遇到replyCode=312,replyText=NO

Springboot RabbitTemplate遇到replyCode=312,replyText=NO

2023-04-06 02:35| 来源: 网络整理| 查看: 265

Springboot RabbitTemplate遇到replyCode=312,replyText=NO_ROUTE 问题描述

在项目中遇到当调用SpringBoot的RabbitTemplate.convertAndSend(String queue, String msg)方法之后,发现消息没有进入队列,同时触发了MQ生产者的ReturnCallBack方法,发现消息确实在发送端出现了问题。

问题原因和解决办法

话不多说直接上官方提供的解决方法。

原因

官方的解释1:

It causes some kind of deadlock down in the amqp-client code. The simplest solution is to do the send on a separate thread - use a TaskExecutor within the callback...

官方的解释2:

the callback is invoked directly on the client's connection I/O thread; to do the send, we open a new channel and wait for the Channel.Open-OK result (which is sent by the broker), but now the I/O thread that would handle that result is blocked waiting for that to happen. It does actually end up with a timeout exception after 10 minutes!

大致意思由于rabbitmq client bug导致客户端内部出现线程死锁导致消息没有进入到队列中,并且在解释中也告知了解决办法,就是可以通过另起一个线程来从新发送可以解决。

解决办法

1.官方确认,并且在2.2.x版本中修复了问题,因此可以通过升级rabbitmq client到2.2.x版本解决此问题

2.在returnCallBack()内部从新起一个线程使用原来的exchange和routing再次发送一遍replyCode=312的消息,即可成功发送消息到队列中 问题描述和启动新的线程来解决

解决问题

下面是我在项目中的部分代码

原先代码和配置 2021-03-08 13:33:42.243 INFO 7817 --- [ntLoopGroup-3-2] c.a.s.ws.TextWebSocketFrameHandler : Message content:type:heartbeat , endpoint:603eea3d660767905c1686af , time: 2021-03-08T13:33:42.226357661+08:00 . 2021-03-08 13:33:42.335 INFO 7817 --- [ntLoopGroup-3-1] c.a.s.ws.TextWebSocketFrameHandler : Message content:type:heartbeat , endpoint:603f489b660767905c1a958e , time: 2021-03-08T13:33:42.333921624+08:00 . 2021-03-08 13:33:42.435 INFO 7817 --- [cTaskExecutor-1] com.Application : Handle task: {"content":"ls","execId":5567,"id":"9257bb4f-2e75-4bd8-b3a4-f2b63c40261f","nodeId":"603f489b660767905c1a958e","resourceName":"[192.168.223.2, 172.17.0.1]","type":"adhoc_command"} 2021-03-08 13:33:42.436 INFO 7817 --- [cTaskExecutor-1] com.Application : Sending adhoc_command task 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f 2021-03-08 13:33:42.444 INFO 7817 --- [ntLoopGroup-3-1] c.a.s.ws.TextWebSocketFrameHandler : Message content:type:task , endpoint:603f489b660767905c1a958e , time: 2021-03-08T13:33:42.442484649+08:00 . 2021-03-08 13:33:42.445 INFO 7817 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler : Task 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f executed: 0 false tResult is [192.168.223.2, 172.17.0.1]: agent 2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 消息主体 message : (Body:'{"code":0,"completed":false,"output":"[192.168.223.2, 172.17.0.1]: agent","taskId":"9257bb4f-2e75-4bd8-b3a4-f2b63c40261f"}' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的consume tag:null 2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 消息返回码 replyCode: 312 2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 描述:NO_ROUTE 2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的交换器 exchange : 2021-03-08 13:33:42.453 ERROR 7817 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的路由键 routing : 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f 2021-03-08 13:33:42.456 INFO 7817 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler : amqpTemplate send message to queue 9257bb4f-2e75-4bd8-b3a4-f2b63c40261f message body is {"code":0,"completed":false,"output":"[192.168.223.2, 172.17.0.1]: agent","taskId":"9257bb4f-2e75-4bd8-b3a4-f2b63c40261f"} 复制代码

发送消息的代码

@Override protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception { val packet = JSON.parseObject(JSON.toJSONString(msg), new TypeReference() { }); val tResult = packet.getData(); log.info("Task {} executed: {} {} tResult is {}", tResult.getTaskId(), tResult.getCode(), tResult.isCompleted(), tResult.getOutput()); rabbitDao.convertAndSend(tResult.getTaskId(), JSON.toJSONString(tResult)); log.info("amqpTemplate send message to queue {} message body is {}", tResult.getTaskId(), JSON.toJSONString(tResult)); } 复制代码

生产者MQ配置

import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; /** * Rabbit mq 数据访问类 */ @Slf4j @Component public class RabbitDao implements RabbitTemplate.ReturnCallback { private static final int RABBITMQ_312_CODE = 312; @Autowired private RabbitTemplate rabbitTemplate; private RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { String msg = String.format("producer send message to exchange confirm callback error message id is %s reason is %s", correlationData.getReturnedMessage().getMessageProperties().getMessageId(), cause); log.error(msg); } } }; public void convertAndSend(String queueName, Object message) { //默认不开启,当消息成功到达exchange的时候,发现没有绑定队列的回调,仅在出现问题时候触发 rabbitTemplate.setReturnCallback(this); //默认不开启,用来确认消息是否到达exchange的回调 rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.convertAndSend(queueName, message); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息主体 message : " + message); log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag()); log.error("消息返回码 replyCode: " + replyCode); log.error("描述:" + replyText); log.error("消息使用的交换器 exchange : " + exchange); log.error("消息使用的路由键 routing : " + routingKey); } } 复制代码

springboot配置文件

##打开confirmcallback和returncallback spring: cloud: rabbitmq: publisher-confirms: true publisher-returns: true 复制代码 更新后代码

根据官方的描述,那么我们可以判断当直接新起一个线程来进行处理即可

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息主体 message : " + message); log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag()); log.error("消息返回码 replyCode: " + replyCode); log.error("描述:" + replyText); log.error("消息使用的交换器 exchange : " + exchange); log.error("消息使用的路由键 routing : " + routingKey); if(replyCode == RABBITMQ_312_CODE){ CompletableFuture.runAsync(()->{ log.info("retry send message one more time when trigger ReturnCallback message"); rabbitTemplate.convertAndSend(exchange, routingKey, message.getBody()); log.info("retry send message exchange is {}", exchange); log.info("retry send message routingKey is {}", routingKey); log.info("retry send message body is {}", new String(message.getBody(), StandardCharsets.UTF_8)); }); } } 复制代码

查看修复后的日志

2021-03-08 17:47:03.697 INFO 2698 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler : Task b617e6c5-ebdb-40a5-8c13-d818b2018188 executed: 0 false tResult is shahy-test-resetPassword-1: 123 2021-03-08 17:47:03.698 INFO 2698 --- [ntLoopGroup-3-1] c.ws.TaskResultHandler : amqpTemplate send message to queue b617e6c5-ebdb-40a5-8c13-d818b2018188 message body is {"code":0,"completed":false,"output":"shahy-test-resetPassword-1: 123","taskId":"b617e6c5-ebdb-40a5-8c13-d818b2018188"} 2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 消息主体 message : (Body:'{"code":0,"completed":false,"output":"shahy-test-resetPassword-1: 123","taskId":"b617e6c5-ebdb-40a5-8c13-d818b2018188"}' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的consume tag:null 2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 消息返回码 replyCode: 312 2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 描述:NO_ROUTE 2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的交换器 exchange : 2021-03-08 17:47:03.701 ERROR 2698 --- [.168.223.5:5672] com.dao.RabbitDao : 消息使用的路由键 routing : b617e6c5-ebdb-40a5-8c13-d818b2018188 2021-03-08 17:47:03.741 INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao : retry send message one more time when trigger ReturnCallback message 2021-03-08 17:47:03.742 INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao : retry send message exchange is 2021-03-08 17:47:03.742 INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao : retry send message routingKey is b617e6c5-ebdb-40a5-8c13-d818b2018188 2021-03-08 17:47:03.742 INFO 2698 --- [onPool-worker-1] com.dao.RabbitDao : retry send message body is {"code":0,"completed":false,"output":"shahy-test-resetPassword-1: 123","taskId":"b617e6c5-ebdb-40a5-8c13-d818b2018188"} 复制代码

通过日志我们可以看到,当触发returnCallBack()方法之后,当replyCode=312的时候,消息被再次处理并且是一个新的线程。最终问题也解决了,消费着可以拿到最终的消息。

更优雅的解决方式 1.直接起一个异步线程

直接起一个线程,在一定程度上还是会有风险假如遇到的问题很多,那么我们就需要在短时间内创建很多线程,有可能造成负载的风险;因此我们可以考虑通过一个线程池来实现,这里就不写全部实现了可以参考一下代码

exec.execute(() -> template.send(...)); 复制代码 2.通过注入一个ApplicationRunner的方式来创建线程 @Bean public ApplicationRunner runner(RabbitTemplate rabbitTemplate) { rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.error("消息主体 message : " + message); log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag()); log.error("消息返回码 replyCode: " + replyCode); log.error("描述:" + replyText); log.error("消息使用的交换器 exchange : " + exchange); log.error("消息使用的路由键 routing : " + routingKey); if(replyCode == 312){ CompletableFuture.runAsync(()->{ log.info("retry send message one more time when trigger ReturnCallback message"); rabbitTemplate.convertAndSend(exchange, routingKey, message.getBody()); log.info("retry send message exchange is {}", exchange); log.info("retry send message routingKey is {}", routingKey); log.info("retry send message body is {}", new String(message.getBody(), StandardCharsets.UTF_8)); }); } }); return args -> {log.info("retry send message one more time when trigger ReturnCallback message with replyCode 312");}; } 复制代码 3.升级版本解决

升级springboot版本到2.2.x以上版本,并且采用官方提供的AsyncRabbitTemplate来实现,我选择使用springboot2.2.0版本,spring-amqp 2.2.13版本

buildscript { ext { springBootVersion = '2.2.0.RELEASE' } } dependencies { implementation 'org.springframework.amqp:spring-amqp:2.2.13.RELEASE' implementation 'org.springframework.amqp:spring-rabbit:2.2.13.RELEASE' implementation 'org.springframework.boot:spring-boot-starter-amqp:2.2.13.RELEASE' } 复制代码

RabbitTemplate配置

@Slf4j @Component public class RabbitDao{ private static final int RABBITMQ_312_CODE = 312; @Autowired private RabbitTemplate rabbitTemplate; private AsyncRabbitTemplate asyncRabbitTemplate(){ //默认不开启,当消息成功到达exchange的时候,发现没有绑定队列的回调,仅在出现问题时候触发 rabbitTemplate.setReturnCallback(returnCallback); //默认不开启,用来确认消息是否到达exchange的回调 rabbitTemplate.setConfirmCallback(confirmCallback); AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate); asyncRabbitTemplate.start(); return asyncRabbitTemplate; } private RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { String msg = String.format("producer send message to exchange confirm callback error message id is %s reason is %s", correlationData.getReturnedMessage().getMessageProperties().getMessageId(), cause); log.error(msg); } } }; private RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息主体 message : " + message); log.error("消息使用的consume tag:" + message.getMessageProperties().getConsumerTag()); log.error("消息返回码 replyCode: " + replyCode); log.error("描述:" + replyText); log.error("消息使用的交换器 exchange : " + exchange); log.error("消息使用的路由键 routing : " + routingKey); if(replyCode == RABBITMQ_312_CODE){ CompletableFuture.runAsync(()->{ log.info("retry send message one more time when trigger ReturnCallback message"); rabbitTemplate.convertAndSend(exchange, routingKey, message.getBody()); log.info("retry send message exchange is {}", exchange); log.info("retry send message routingKey is {}", routingKey); log.info("retry send message body is {}", new String(message.getBody(), StandardCharsets.UTF_8)); }); } } }; public void convertAndSend(String queueName, Object message) { asyncRabbitTemplate().convertSendAndReceive(queueName, message); } } 复制代码

生产者发送消息

protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception { val packet = JSON.parseObject(JSON.toJSONString(msg), new TypeReference() { }); val tResult = packet.getData(); log.info("Task {} executed: {} {} tResult is {}", tResult.getTaskId(), tResult.getCode(), tResult.isCompleted(), tResult.getOutput()); rabbitDao.convertAndSend(tResult.getTaskId(), JSON.toJSONString(tResult)); log.info("amqpTemplate send message to queue {} message body is {}", tResult.getTaskId(), JSON.toJSONString(tResult)); } 复制代码


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有